package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.DimUtil;
import com.atguigu.gmall.realtime.utils.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.concurrent.ExecutorService;

/**
 * Author: Felix
 * Date: 2022/4/28
 * Desc: 发送异步请求
 * 模板方法设计模式
 * 在父类中定义完成某一个功能的核心算法骨架(实现步骤)，但是具体的实现要延迟到子类中去完成。
 * 在不改变父类核心算法骨架的前提下，每一个子类都可以有自己不同的实现。
 */
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T, T> implements DimJoinFunction<T>{

    private ExecutorService executorService;
    private String tableName;

    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        executorService = ThreadPoolUtil.getInstance();
    }

    @Override
    public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
        //开启多个线程，发送异步请求
        executorService.submit(
            new Runnable() {
                @Override
                public void run() {
                    try {
                        long start = System.currentTimeMillis();
                        //维度关联操作
                        //1.根据流中的对象获取要关联的维度的主键
                        String key = getKey(obj);
                        //2.根据主键 使用自己封装的工具类DimUtil查询维度对象
                        JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);
                        //3.将维度对象中维度属性  补充给流中的对象
                        if (dimInfoJsonObj != null) {
                            join(obj, dimInfoJsonObj);
                        }
                        long end = System.currentTimeMillis();
                        System.out.println("异步维度关联耗时:" + (end - start) + "毫秒");

                        //获取数据库交互的结果并发送给 ResultFuture的回调函数
                        resultFuture.complete(Collections.singleton(obj));
                    } catch (Exception e) {
                        e.printStackTrace();
                        throw  new RuntimeException("异步维度关联发生了异常~~~");
                    }
                }
            }
        );

    }
}
